Skip to content

Conversation

@Reo-LEI
Copy link
Contributor

@Reo-LEI Reo-LEI commented Oct 7, 2021

This PR is completed on the basis of #2731 and trying to fixes #2730. Thanks for the contribution of @openinx.

In this PR, I make RowDataProjection as row data wrapper as this comment #2731 (comment) mentioned and supprot the Map and List type projection.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Oct 21, 2021

@rdblue @kbendick @openinx @stevenzwu Could you take a look of this again? 😄

)
))
);
AssertHelpers.assertThrows("Should be error because cannot project a partial nested list element.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is a little confusing for me at first.

Can we possibly rephrase this as Should not all users to project onto a subset of fields of a struct used in a list type? That would make what's being tested a bit more clear (at least for me) from the get go.

boolean elementProjectable = !projectedList.elementType().isNestedType() ||
projectedList.elementType().equals(originalList.elementType());
Preconditions.checkArgument(elementProjectable,
"Cannot project a partial list element RowData. Trying to project %s out of %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See note below about this exception message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I trying to keep this message same as StructLikeProjection. I feel this msg is ok, What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static RowData.FieldGetter createFieldGetter(RowType rowType,
Types.StructType rowStruct,
Types.NestedField projectField) {
for (int i = 0; i < rowStruct.fields().size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop find essentially results in n^2 complexity. We can use this API from StructType.

    public NestedField field(int id) 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we not only need to found the row field which field id equal to project field id, but also need to know the position of the match field. Even if we can get the match row field by StructType.field(int id), we also need to traverse the rowStruct to found out the field position again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Can we iterate through the schema once and set up the mapping btw field id and position id? I have a little performance concern of n^2 complexity for table with a lot of columns (like thousands or more).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. There are tables with very high cardinality where this will potentially have a real performance impact. This tends to be especially true for base tables (raw ingested data events from clients etc), which often have very wide schemas and is also an area where Flink is pretty commonly used.

Anything that can be done to reduce this overhead would be great.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a great idea, let's do this~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little performance concern of n^2 complexity for table with a lot of columns (like thousands or more)

I'm fine with either. Because the complexity is actually n*m, let's say the n is the table's field number and m is the projection fields number. If both @stevenzwu and @kbendick think it's necessary to do, I'm okay with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little performance concern of n^2 complexity for table with a lot of columns (like thousands or more)

I'm fine with either. Because the complexity is actually n*m, let's say the n is the table's field number and m is the projection fields number. If both @stevenzwu and @kbendick think it's necessary to do, I'm okay with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I construct a fieldIdToPosition map and use StructType.field(int id) to find the row field. Now the complexity reduce to n, I think the performance will not be a problem.

@openinx
Copy link
Member

openinx commented Oct 26, 2021

@Reo-LEI Could you take a look for the checkstyle issue ?

Error: eckstyle] [ERROR] /home/runner/work/iceberg/iceberg/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java:32:8: Unused import - org.apache.iceberg.StructLike. [UnusedImports]

@openinx
Copy link
Member

openinx commented Oct 26, 2021

I'd like to take a look for this PR today, I think it's critical important feature for our flink users to read the v2 table. Thanks @Reo-LEI for picking up this PR !

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Oct 26, 2021

@Reo-LEI Could you take a look for the checkstyle issue ?

Error: eckstyle] [ERROR] /home/runner/work/iceberg/iceberg/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java:32:8: Unused import - org.apache.iceberg.StructLike. [UnusedImports]

Sure, I will fix this latter.

RowType nestedRowType = (RowType) rowType.getTypeAt(i);
int rowPos = i;
return row -> {
RowData nestedRow = row.isNullAt(rowPos) ? null : row.getRow(rowPos, nestedRowType.getFieldCount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: If the nestedRow is null, do we still need to traverse the nested fields by using the RowDataProjection#project ? I think we can just return the null for the projection value ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a small patch for this:

diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
index 9d1e8ea67..25a5b3ab3 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
@@ -45,7 +45,11 @@ public class RowDataProjection implements RowData {
    * @return a wrapper to project rows
    */
   public static RowDataProjection create(Schema schema, Schema projectedSchema) {
-    return new RowDataProjection(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectedSchema.asStruct());
+    return RowDataProjection.create(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectedSchema.asStruct());
+  }
+
+  public static RowDataProjection create(RowType rowType, Types.StructType schema, Types.StructType projectedSchema) {
+    return new RowDataProjection(rowType, schema, projectedSchema);
   }
 
   private final RowData.FieldGetter[] getters;
@@ -73,9 +77,14 @@ public class RowDataProjection implements RowData {
             RowType nestedRowType = (RowType) rowType.getTypeAt(i);
             int rowPos = i;
             return row -> {
-              RowData nestedRow = row.isNullAt(rowPos) ? null : row.getRow(rowPos, nestedRowType.getFieldCount());
-              return new RowDataProjection(nestedRowType, rowField.type().asStructType(),
-                  projectField.type().asStructType()).wrap(nestedRow);
+              if (row.isNullAt(rowPos)) {
+                return null;
+              } else {
+                RowData nestedRow = row.getRow(rowPos, nestedRowType.getFieldCount());
+                return RowDataProjection
+                    .create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType())
+                    .wrap(nestedRow);
+              }
             };
 
           case MAP:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could not return null when the nestedRow is null. Because StructProjection will still project the nested struct even if the nested struct is null. If we return null here, the unittest will fail, because the expected record is not null but actual row data is null.

Assert.assertTrue("expected Record and actual RowData should be both null or not null",


// Project the RowData to remove the extra meta columns.
if (!projectedSchema.sameSchema(deletes.requiredSchema())) {
RowDataProjection rowDataProjection = RowDataProjection.create(deletes.requiredSchema(), projectedSchema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the RowDataProjection#create does a FlinkSchemaUtil.convert(schema) for the required schema to project, and I believe the FlinkDeleteFilter also did the same thing inside. I think we can reuse the converted flink row type between them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Now I get the row type from FlinkDeleteFilter and pass it to RowDataProjection.

return this;
}

public Object getValue(int pos) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can be a private method, right ?

}
}
}
throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectField, rowStruct));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we need a more clear message for this exception: Cannot locate the project field <%s> in the iceberg struct <%s>

for (int i = 0; i < rowStruct.fields().size(); i++) {
Types.NestedField rowField = rowStruct.fields().get(i);
if (rowField.fieldId() == projectField.fieldId()) {
Preconditions.checkArgument(rowField.type().typeId() == projectField.type().typeId(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can be simplified by the following lines as the Preconditions.checkArgument can format the error message directly.

        Preconditions.checkArgument(rowField.type().typeId() == projectField.type().typeId(),
            "Different iceberg type between row field <%s> and project field <%s>",
            rowField, projectField);

RowType nestedRowType = (RowType) rowType.getTypeAt(i);
int rowPos = i;
return row -> {
RowData nestedRow = row.isNullAt(rowPos) ? null : row.getRow(rowPos, nestedRowType.getFieldCount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a small patch for this:

diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
index 9d1e8ea67..25a5b3ab3 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
@@ -45,7 +45,11 @@ public class RowDataProjection implements RowData {
    * @return a wrapper to project rows
    */
   public static RowDataProjection create(Schema schema, Schema projectedSchema) {
-    return new RowDataProjection(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectedSchema.asStruct());
+    return RowDataProjection.create(FlinkSchemaUtil.convert(schema), schema.asStruct(), projectedSchema.asStruct());
+  }
+
+  public static RowDataProjection create(RowType rowType, Types.StructType schema, Types.StructType projectedSchema) {
+    return new RowDataProjection(rowType, schema, projectedSchema);
   }
 
   private final RowData.FieldGetter[] getters;
@@ -73,9 +77,14 @@ public class RowDataProjection implements RowData {
             RowType nestedRowType = (RowType) rowType.getTypeAt(i);
             int rowPos = i;
             return row -> {
-              RowData nestedRow = row.isNullAt(rowPos) ? null : row.getRow(rowPos, nestedRowType.getFieldCount());
-              return new RowDataProjection(nestedRowType, rowField.type().asStructType(),
-                  projectField.type().asStructType()).wrap(nestedRow);
+              if (row.isNullAt(rowPos)) {
+                return null;
+              } else {
+                RowData nestedRow = row.getRow(rowPos, nestedRowType.getFieldCount());
+                return RowDataProjection
+                    .create(nestedRowType, rowField.type().asStructType(), projectField.type().asStructType())
+                    .wrap(nestedRow);
+              }
             };
 
           case MAP:

private static RowData.FieldGetter createFieldGetter(RowType rowType,
Types.StructType rowStruct,
Types.NestedField projectField) {
for (int i = 0; i < rowStruct.fields().size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little performance concern of n^2 complexity for table with a lot of columns (like thousands or more)

I'm fine with either. Because the complexity is actually n*m, let's say the n is the table's field number and m is the projection fields number. If both @stevenzwu and @kbendick think it's necessary to do, I'm okay with it.

private static RowData.FieldGetter createFieldGetter(RowType rowType,
Types.StructType rowStruct,
Types.NestedField projectField) {
for (int i = 0; i < rowStruct.fields().size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little performance concern of n^2 complexity for table with a lot of columns (like thousands or more)

I'm fine with either. Because the complexity is actually n*m, let's say the n is the table's field number and m is the projection fields number. If both @stevenzwu and @kbendick think it's necessary to do, I'm okay with it.

boolean valueProjectable = !projectedMap.valueType().isNestedType() ||
projectedMap.valueType().equals(originalMap.valueType());
Preconditions.checkArgument(keyProjectable && valueProjectable,
"Cannot project a partial map key or value RowData. Trying to project %s out of %s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should say Cannot project a partial map key or value with non-primitive type, Trying .., the assert failure does not mean it's necessary to be a RowData, it can be other data types such as list or map etc.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Oct 27, 2021

I adressed some comment and leave some comment to discuss. I think you can take another looks of this PR. @rdblue @openinx @stevenzwu @kbendick 😄

@openinx
Copy link
Member

openinx commented Nov 1, 2021

Let me take another look today ! Thanks @Reo-LEI for the updating.

/**
* Creates a projecting wrapper for {@link RowData} rows.
* <p>
* This projection does not work with repeated types like lists and maps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This projection does not work with repeated types like lists and maps with nested children types ? I think it works fine for lists/maps with primitive children types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to say: Projecting a partial map key or value with non-primitive type does not work in this projection wrapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meaning of this comment is exactly what you said that the projection will not project the nested children types of repeated types. I will rephrase it.

/**
* Creates a projecting wrapper for {@link RowData} rows.
* <p>
* This projection does not work with repeated types like lists and maps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Comment on lines 79 to 82
if (rowField == null) {
throw new IllegalArgumentException(String.format(
"Cannot locate the project field <%s> in the iceberg struct <%s>", projectField, rowStruct));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

      Preconditions.checkNotNull(rowField,
          "Cannot locate the project field <%s> in the iceberg struct <%s>", projectField, rowStruct);

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall, thanks @Reo-LEI for the great contribution and thanks @stevenzwu for the double check. I left several minor comments.

@Reo-LEI
Copy link
Contributor Author

Reo-LEI commented Nov 1, 2021

I addressed the rest of comments just now, and you can check this again @openinx. And thanks @openinx @stevenzwu @kbendick @rdblue for review,

@kbendick
Copy link
Contributor

kbendick commented Nov 1, 2021

Sorry for missing some pings. Was out of office for a few weeks a while back and have still been playing a bit of catch up.

Please feel free to message me on slack if it's urgent btw. But retroactive +1. 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Encounter exceptions when query the iceberg table filled with change logs by using flink SQL

5 participants